package pku;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.io.File;
import java.util.concurrent.atomic.AtomicLong;
import java.io.FileNotFoundException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collection;

/**
 * 这是一个简单的基于内存的实现，以方便选手理解题意；
 * 实际提交时，请维持包名和类名不变，把方法实现修改为自己的内容；
 * ## 此版本是一个内存实现的消息队列
 * 为了方便大家理解题目,  通过这个类你可以了解到测评程序的运行逻辑
 * # 编程目标
 * 在DefaultQueueStoreImpl中实现QueueStore接口的get和put方法，不用改变代码目录结构，在pku包下面编写代码
 */
public class DefaultQueueStoreImpl extends QueueStore {

    public static final String dirfile = "data/"; //文件夹
    public static Collection<byte[]> EMPTY = new ArrayList<>(); //用于空的返回
    //定义三个常量：文件通道、写的位置、队列映射；每个都对应多个队列
    private FileChannel channel; //使用NIO进行读写操作
    private AtomicLong wrotePosition;  //用于定位写入文件内容的位置
    private ConcurrentHashMap<String, Queue> queueMap;  //用于缓存  注意这里的第二个参数是Queue类型，
    public DefaultQueueStoreImpl() {  //构造函数用于初始化 常量、创建文件
        wrotePosition = new AtomicLong(0L);  //初始时为0  每个文件都有自己的写入位置
        RandomAccessFile file = null; //这个类可以随机访问文件，
        try {
            File dir = new File("data");
            if (!dir.exists()) {
                dir.mkdirs();
            }
            file = new RandomAccessFile(dirfile + "file" + ".txt", "rw"); //通过RandomAccessFile获得FileChannel
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }
        channel = file.getChannel();  //获得文件对应的FileChannel
        queueMap = new ConcurrentHashMap<>();
    }
    public void put(String queueName, byte[] message) { //调用写函数
        Queue queue;  //创建一个queue类的对象
        queue = queueMap.get(queueName); //当一个队列来了之后，先去queuemap中查找对应的queue,没有当前值时直接返回null
        if (queue == null) {  //第一个数据来，这里是空
            synchronized (this) {
                // 双重检测  似乎没啥用，试试删除会怎么样
                queue = queueMap.get(queueName);
            if (queue == null) {
                queue = new Queue(channel, wrotePosition); //每个queue都有自己的通道数组和偏移量数组
                queueMap.put(queueName, queue); //建立queuemap映射
            }
            }
        }
        queue.put(message); //就消息放入queue对象中
    }
    public Collection<byte[]> get(String queueName, long offset, long num) {
        Queue queue = queueMap.get(queueName);  //根据索引找到对应的queue对象
        if (queue == null) {
            return EMPTY;
        }
        return queue.get(offset, num);  //返回消息
    }
}

class Queue {  //完成每个queue对应消息的put 和 get

    private FileChannel channel;  //可以理解为文件
    private AtomicLong wrotePosition; //可以理解为文件写入的位置
    public Queue(FileChannel channel, AtomicLong wrotePosition) { //初始化
        this.channel = channel;
        this.wrotePosition = wrotePosition;
    }
    public final static int SINGLE_MESSAGE_SIZE = 60;  //每条消息的大小
    public final static int BLOCK_SIZE = 50;    //一个存储块存放消息的数量 共2k*1000*1000=2g
    private int queueLength = 0;  //单个队列的总消息数
    private volatile boolean firstPut = true;
    private int queueIndex;
    public final static int bufferSize = SINGLE_MESSAGE_SIZE * BLOCK_SIZE;// 缓冲区大小
    private ByteBuffer buffer = ByteBuffer.allocateDirect(bufferSize);// 读写缓冲区 开销在JVM之外，也就是直接分配到内存中
    private static final byte FILL_BYTE = (byte) 0; //用于填充数据

    public void put(byte[] message) {  //put 由评测程序保证了 queue 级别的同步
        if (firstPut) {
            this.queueIndex = 0;
            firstPut = false;
        }
        if(message.length > buffer.remaining() - 16){
            flushput();  //将数据写入到磁盘，然后再放入到缓存,
        }
        int len = message.length;
        String strlen = String.valueOf(len);
        byte[] messlen = strlen.getBytes();
        byte[] messlenres = new byte[16];
        for (int i = 0; i < 16; i++) {
            if (i < messlen.length) {
                messlenres[i] = messlen[i];
            } else {
                messlenres[i] = FILL_BYTE; //对于多余的数据进行填充
            }
        }
        buffer.put(messlenres);
        buffer.put(message); // 将消息放入缓存  将当前position的值置为message，并将position值+1
        this.queueLength++; //每个队列的消息总数
    }
    private static final int size = 50;  //200 / BLOCK_SIZE + 1
    private Future<Long> flushFuture;  //用来传递上一个物理块的起始偏移量
    private long offsets[] = new long[size]; // 记录该块在物理文件中的起始偏移量
    private int blockSize = 0;  //用于记录当前的块数
    private static ExecutorService flushThread = Executors.newSingleThreadExecutor();  //创建一个单线程化的线程池 singleThreadExecutor
    private static ByteBuffer flushBuffer = ByteBuffer.allocateDirect(64 * 1024); //64k
    private int queueIndexes[] = new int[size]; // 记录该块中第一个消息的起始消息编号
    private void flushput() {
        if (flushFuture != null) { //刚开始肯定是空的 存的是writePosition
            try {
                offsets[blockSize - 1] = flushFuture.get(); //通过future对象获取现成的返回值
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
            flushFuture = null;
        }
        buffer.flip(); // 将Buffer从写模式切换到读模式  postion置零
        int remaining = buffer.remaining(); //返回limit和position之间相对位置差 也即写入的消息数量
        final byte[] message = new byte[remaining]; //赋值到message
        buffer.get(message);  //get()方法返回当前postion所指的数组值，并将position值+1
        flushFuture = flushThread.submit(new Callable<Long>() { //启动线程时会返回一个Future对象：对应块在物理文件中的便宜量
            @Override
            public Long call() throws Exception {
                long writePosition = wrotePosition.getAndAdd(message.length);  //先取当前应该写入的位置，再修改位置，即加上缓存区对应的消息长度
                try {
                    if (flushBuffer.remaining() < message.length) { //消息来了之后先放到buffer,满了之后放到flushBuffer，再写到文件
                        flushBuffer.flip();                         //每个队列一个buffer，
                        channel.write(flushBuffer);  //写入到文件中
                        flushBuffer.clear();  //清空
                    }
                    flushBuffer.put(message);  //如果还没有空，则继续写到flushbuffer缓存中
                } catch (IOException e) {
                    e.printStackTrace();
                }
                return writePosition; //带返回值的异步线程
            }
        });
        buffer.clear(); //buffer已经写完，可以清空
        queueIndexes[blockSize] = this.queueIndex;  //每个队列每块的起始位置的消息下标
        blockSize++;
        if (blockSize > offsets.length * 0.7) {  //当当前物理块下标大于offsets长度的0.7倍时，扩容
            offsets = copyOf(offsets, offsets.length * 2); //以两倍来扩容
            queueIndexes = copyOf(queueIndexes, queueIndexes.length * 2);
        }
        this.queueIndex = queueLength;
    }

    private volatile boolean firstGet = true;
    public synchronized Collection<byte[]> get(long offset, long num) {  //可能有并发读
        if (firstGet) {   //如果第一次读该队列数据，需要先将队列在缓存中还有的数据，写入到磁盘中
            flushForGet();  //如果缓存中还有数据，先写入到磁盘
            firstGet = false;
        }
        if (offset > queueLength - 1) {
            return DefaultQueueStoreImpl.EMPTY;
        }
        int startIndex = (int) offset;
        int endIndex = Math.min(startIndex + (int) num - 1, queueLength - 1);
        List<byte[]> result = new ArrayList<>();
        int startBlock = 0;  //计算大致从哪个块开始，但不会比这个更小
        int endBlock = queueIndexes.length - 1;   //可能在这个块之后才截止
        boolean signstart = false;
        boolean signend = false;
        for(int i = startBlock;i<blockSize;++i){
            if(queueIndexes[i] > startIndex && signstart == false) {
                startBlock = i - 1;
                signstart = true;
            }
            if(queueIndexes[i] > endIndex && signend == false){
                endBlock = i-1;
                signend = true;
                break;
            }
        }
        if(signend == false) endBlock = blockSize - 1;
        if(signstart == false) startBlock = blockSize - 1;
        int numcount = endIndex - startIndex + 1;
        if(startBlock == endBlock){  //说明消息在同一块内，直接读够数量停止
            int blockStartIndex = queueIndexes[startBlock];
            int countreadempty = startIndex - blockStartIndex;
            Long readstart = this.offsets[startBlock];
            buffer.clear();//在一个块内，先读到缓存中，
            try {
                channel.read(buffer, readstart); //从给定的文件位置开始，从该通道读取一个块到buffer中，不够一个块也全部读入buffer
            } catch (IOException e) {  // 通常一个buffer可以放40条消息，
                e.printStackTrace();
            }
            buffer.flip();

            while(countreadempty > 0){ //跳过块内的这些消息，这些消息一定在一个块内，并且连续
                byte[] byteslen = new byte[16];
                buffer.get(byteslen);
                String len = new String(lentruncate(byteslen)); //长度转换 其实并不需要真的去读，只需要读取长度就可以
                int lenlen = Integer.parseInt(len); //这是内容的长度
                byte[] bytes = new byte[lenlen];
                buffer.get(bytes);
                countreadempty--;
            }
            while(numcount > 0){ //读取正文
                byte[] byteslen = new byte[16];
                buffer.get(byteslen);
                String len = new String(lentruncate(byteslen)); //长度转换 其实并不需要真的去读，只需要读取长度就可以
                int lenlen = Integer.parseInt(len);
                byte[] bytes = new byte[lenlen];
                buffer.get(bytes);
                result.add(bytes);
                numcount--;
            }
            return result;
        }
        else{
            for(int i = startBlock;i<endBlock;++i){
                int sizeeachBlock = queueIndexes[i+1] - queueIndexes[i]; //每块共有多少消息
                int blockStartIndex = queueIndexes[i];
                int countreadempty = startIndex - blockStartIndex;
                Long readstart = this.offsets[i];  //这个循环是为了调整readstart的读取位置

                buffer.clear();//在一个块内，先读到缓存中，
                try {
                    channel.read(buffer, readstart); //从给定的文件位置开始，从该通道读取一个块到buffer中，不够一个块也全部读入buffer
                } catch (IOException e) {  // 通常一个buffer可以放40条消息，
                    e.printStackTrace();
                }
                buffer.flip();

                while(countreadempty > 0 && sizeeachBlock > 0){ //跳过块内的这些消息，这些消息一定在一个块内，并且连续
                    byte[] byteslen = new byte[16];
                    buffer.get(byteslen);
                    String len = new String(lentruncate(byteslen)); //长度转换 其实并不需要真的去读，只需要读取长度就可以
                    int lenlen = Integer.parseInt(len); //这是内容的长度
                    byte[] bytes = new byte[lenlen];
                    buffer.get(bytes);
                    countreadempty--;
                    sizeeachBlock--;
                }
                while (sizeeachBlock > 0 && numcount >0){  //这里开始真正的读取内容
                    byte[] byteslen = new byte[16];
                    buffer.get(byteslen);
                    String len = new String(lentruncate(byteslen)); //长度转换 其实并不需要真的去读，只需要读取长度就可以
                    int lenlen = Integer.parseInt(len);
                    byte[] bytes = new byte[lenlen];
                    buffer.get(bytes);
                    result.add(bytes);
                    sizeeachBlock--;
                    numcount--;
                }
            }
            Long readstart = this.offsets[endBlock];
            buffer.clear();//在一个块内，先读到缓存中，
            try {
                channel.read(buffer, readstart); //从给定的文件位置开始，从该通道读取一个块到buffer中，不够一个块也全部读入buffer
            } catch (IOException e) {  // 通常一个buffer可以放40条消息，
                e.printStackTrace();
            }
            buffer.flip();
            while(numcount > 0){ //读取正文  横跨多个块怎么办  还是得知道结束块在哪
                byte[] byteslen = new byte[16];
                buffer.get(byteslen);
                String len = new String(lentruncate(byteslen)); //长度转换 其实并不需要真的去读，只需要读取长度就可以
                int lenlen = Integer.parseInt(len);
                byte[] bytes = new byte[lenlen];
                buffer.get(bytes);
                result.add(bytes);
                numcount--;
            }
            return result;
        }
    }

    private void flushForGet() {
        if (flushFuture != null) {   // 物理地址的起始位置
            try {
                offsets[blockSize - 1] = flushFuture.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
            flushFuture = null;
        }
        buffer.flip();
        flushFuture = flushThread.submit(new Callable<Long>() {
            @Override
            public Long call() throws Exception {
                long writePosition = wrotePosition.getAndAdd(buffer.remaining());
                try {
                    if (flushBuffer.position() > 0) {  //如果flushbuffer中还有消息，先写入磁盘
                        flushBuffer.flip();
                        channel.write(flushBuffer);
                        flushBuffer.clear();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
                channel.write(buffer);  //将buffer中的数据写入到磁盘
                buffer.clear();  //清空buffer
                return writePosition;
            }
        });

        try {
            offsets[blockSize] = flushFuture.get(); //通过future对象获取现成的返回值。执行future.get()，主线程会堵塞，直至当前future线程返回结果
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        queueIndexes[blockSize] = this.queueIndex;
        blockSize++;
    }

    public static int[] copyOf(int[] original, int newLength) {
        int[] copy = new int[newLength];
        System.arraycopy(original, 0, copy, 0,
                Math.min(original.length, newLength)); //
        return copy;
    }

    public static long[] copyOf(long[] original, int newLength) {
        long[] copy = new long[newLength];
        System.arraycopy(original, 0, copy, 0,
                Math.min(original.length, newLength));
        return copy;
    }

    private byte[] lentruncate(byte[] message) {
        int realSize = 0;
        for (int i = 0; i < 16; i++) {
            if (message[i] == FILL_BYTE) {
                realSize = i;
                break;
            }
        }
        return Arrays.copyOf(message, realSize);//修改长度，截止到有效位
    }
}
